/**
 * @file write_apm.hpp
 *
 * Write operation, with Application Persistency (APM), i.e. RDMA Write followed
 * by a random RDMA Read flushing write that may be still residing in RNIC.
 *
 * This implementation is only an RDMA op, it simply overwrites a remote region
 * and makes sure it is persistent on return. No gestalt::dataslot availability
 * check is performed, writing data while expanding or shrinking value size
 * should be handled by other upper-layer logic.
 */

#pragma once

#include <vector>

#include "internal/ops_base.hpp"
#include "optim.hpp"


namespace gestalt {
namespace ops {

using namespace std;


/**
 * Parallel write
 */
class WriteAPM final : public Base {
public:
    using Base::buf;
    struct target_t {
        rdma_cm_id *id;
        uintptr_t addr;
        uint32_t rkey;
    public:
        target_t(rdma_cm_id *_id, uintptr_t _addr, uint32_t _rkey) noexcept :
            id(_id), addr(_addr), rkey(_rkey)
        { }
    };

private:
    ibv_sge sgl[2];
    mutable ibv_send_wr wr[2];
    vector<target_t> targets;

    string opname() const noexcept override
    {
        return "WriteAPM";
    }

    /* c/dtor */
public:
    WriteAPM(ibv_pd *pd, ibv_cq *scq) : Base(pd, scq)
    {
        /* Write */
        sgl[0].addr = reinterpret_cast<uintptr_t>(buf.data());
        sgl[0].lkey = mr->lkey;

        wr[0].next = &wr[1];
        wr[0].sg_list = &sgl[0]; wr[0].num_sge = 1;
        wr[0].opcode = IBV_WR_RDMA_WRITE;
        wr[0].send_flags = 0;

        /* Flush */
        sgl[1].addr = reinterpret_cast<uintptr_t>(buf.data());
        sgl[1].length = 1;
        sgl[1].lkey = mr->lkey;

        wr[1].next = NULL;
        wr[1].sg_list = &sgl[1]; wr[0].num_sge = 1;
        wr[1].opcode = IBV_WR_RDMA_READ;
        wr[1].send_flags = IBV_SEND_SIGNALED;
    }

    /* interface */
public:
    constexpr static unsigned max_expected_replicas = 8;

    /**
     * 
     * @note fill #buf before parameterizing
     * @param vec 
     */
    inline void parameterize(const vector<target_t> &vec) noexcept
    {
        assert(1 <= vec.size() && vec.size() <= max_expected_replicas);

        targets = vec;
        sgl[0].length = buf.slots() * sizeof(dataslot);
    }
    inline WriteAPM &operator()(const vector<target_t> &vec) noexcept
    {
        parameterize(vec);
        return *this;
    }

    /**
     * 
     * @param wr 
     * @param bad_wr 
     * @param wc 
     * @return 
     * * 0 ok
     * * otherwise bad op (-errno)
     * * ...
     */
    int perform(
        const ibv_send_wr *wr,
        ibv_send_wr* &bad_wr, ibv_wc &wc) const noexcept override
    {
        assert(wr == this->wr);

        /* emit requests */

        for (const auto &t : targets) {
            this->wr[0].wr.rdma.remote_addr = t.addr;
            this->wr[0].wr.rdma.rkey = t.rkey;
            this->wr[1].wr.rdma.remote_addr = t.addr;
            this->wr[1].wr.rdma.rkey = t.rkey;
            if (int r = ibv_post_send(t.id->qp, this->wr, &bad_wr); r)
                [[unlikely]] return r;
        }

        /* poll from all channels */

        if constexpr (optimization::batched_poll) {
            ibv_wc wcbuf[max_expected_replicas];
            unsigned remain = targets.size();
            for (unsigned retry = max_poll; remain && (false || retry); --retry) {
                int c;  // # of completions this poll
                [[unlikely]] c = ibv_poll_cq(scq, remain, wcbuf);
                remain -= c;
                if (!c)
                    [[unlikely]] continue;
                if (c < 0)
                    [[unlikely]] return c;
                for (unsigned cc = 0; cc < c; cc++) {
                    const auto &cwc = wcbuf[cc];
                    if (cwc.status != IBV_WC_SUCCESS) {
                        [[unlikely]] std::memcpy(&wc, &cwc, sizeof(wc));
                        return -ECANCELED;
                    }
                }
            }
            if (remain)
                [[unlikely]] return -ETIME;
        }
        else {
            for (const auto &t : targets) {
                int r;
                for (unsigned retry = max_poll; false || retry; --retry) {
                    [[unlikely]] r = ibv_poll_cq(t.id->send_cq, 1, &wc);
                    if (!r)
                        [[unlikely]] continue;
                    if (r < 0)
                        [[unlikely]] return r;
                    if (wc.status != IBV_WC_SUCCESS)
                        [[unlikely]] return -ECANCELED;
                    break;
                }
                if (!r)
                    [[unlikely]] return -ETIME;
            }
        }

        return 0;
    }
    int perform(void) const override
    {
        return Base::perform(wr);
    }
    using Base::operator();

};  /* class WriteAPM */

}   /* namespace ops */
}   /* namespace gestalt */
